/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include "CCommon.h"
#include "CMessageExt.h"
#include "CMessageQueue.h"
#include "CPullConsumer.h"
#include "CPullResult.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <memory.h>
#include <unistd.h>
#endif

void thread_sleep(unsigned milliseconds) {
#ifdef _WIN32
  Sleep(milliseconds);
#else
  usleep(milliseconds * 1000);  // takes microseconds
#endif
}

int main(int argc, char* argv[]) {
  int i = 0, j = 0;
  int ret = 0;
  int size = 0;
  CMessageQueue* mqs = NULL;
  printf("PullConsumer Initializing....\n");
  CPullConsumer* consumer = CreatePullConsumer("Group_Consumer_Test");
  SetPullConsumerNameServerAddress(consumer, "172.17.0.2:9876");
  StartPullConsumer(consumer);
  printf("Pull Consumer Start...\n");
  for (i = 1; i <= 5; i++) {
    printf("FetchSubscriptionMessageQueues : %d times\n", i);
    ret = FetchSubscriptionMessageQueues(consumer, "T_TestTopic", &mqs, &size);
    if (ret != OK) {
      printf("Get MQ Queue Failed,ErrorCode:%d\n", ret);
    }
    printf("Get MQ Size:%d\n", size);
    for (j = 0; j < size; j++) {
      int noNewMsg = 0;
      long long tmpoffset = 0;
      printf("Pull Message For Topic:%s,Queue:%s,QueueId:%d\n", mqs[j].topic, mqs[j].brokerName, mqs[j].queueId);
      do {
        int k = 0;
        CPullResult pullResult = Pull(consumer, &mqs[j], "*", tmpoffset, 32);
        if (pullResult.pullStatus != E_BROKER_TIMEOUT) {
          tmpoffset = pullResult.nextBeginOffset;
        }
        printf("PullStatus:%d,MaxOffset:%lld,MinOffset:%lld,NextBegainOffset:%lld", pullResult.pullStatus,
               pullResult.maxOffset, pullResult.minOffset, pullResult.nextBeginOffset);
        switch (pullResult.pullStatus) {
          case E_FOUND:
            printf("Get Message Size:%d\n", pullResult.size);
            for (k = 0; k < pullResult.size; ++k) {
              printf("Got Message ID:%s,Body:%s\n", GetMessageId(pullResult.msgFoundList[k]),
                     GetMessageBody(pullResult.msgFoundList[k]));
            }
            break;
          case E_NO_MATCHED_MSG:
            noNewMsg = 1;
            break;
          default:
            noNewMsg = 0;
        }
        ReleasePullResult(pullResult);
        thread_sleep(100);
      } while (noNewMsg == 0);
      thread_sleep(1000);
    }
    thread_sleep(2000);
    ReleaseSubscriptionMessageQueue(mqs);
  }
  thread_sleep(5000);
  ShutdownPullConsumer(consumer);
  DestroyPullConsumer(consumer);
  printf("PullConsumer Shutdown!\n");
  return 0;
}
